---
toc_max_heading_level: 4
sidebar_position: 4
sidebar_label: Go
title: TDengine TSDB Go Connector
---

import Tabs from '@theme/Tabs';
import TabItem from '@theme/TabItem';

import RequestId from "./_request_id.mdx";

`driver-go` 是 TDengine TSDB 的官方 Go 语言连接器，实现了 Go 语言 [database/sql](https://golang.org/pkg/database/sql/)  包的接口。Go 开发人员可以通过它开发存取 TDengine TSDB 集群数据的应用软件。

## Go 版本兼容性

支持 Go 1.14 及以上版本。

## 支持的平台

- 原生连接支持的平台和 TDengine TSDB 客户端驱动支持的平台一致。
- WebSocket 连接支持所有能运行 Go 的平台。

## 版本历史

| driver-go 版本 | 主要变化                                       | TDengine TSDB 版本 |
|--------------|--------------------------------------------|------------------|
| v3.7.6       | 修复原生连接 stmt2 绑定空字符串和空字节数组崩溃                | -                |
| v3.7.5       | 修复 Windows 上原生连接失败                         | -                |
| v3.7.4       | 支持链接级别时区设置                                 | -                |
| v3.7.3       | 修复 WebSocket 连接 stmt 查询结果包含 decimal 数据崩溃   | -                |
| v3.7.2       | 支持 BLOB 类型                                 | -                |
| v3.7.1       | 支持 ipv6 连接                                 | -                |
| v3.7.0       | 支持 decimal 类型                              | 3.3.6.0 及更高版本    |
| v3.6.0       | stmt2 原生接口，DSN 支持密码包含特殊字符（url.QueryEscape） | 3.3.5.0 及更高版本    |
| v3.5.8       | 修复空指针异常                                    | -                |
| v3.5.7       | taosWS 和 taosRestful 支持传入 request id       | -                |
| v3.5.6       | 提升 WebSocket 查询和写入性能                       | 3.3.2.0 及更高版本    |
| v3.5.5       | restful 支持跳过 ssl 证书检查                      | -                |
| v3.5.4       | 兼容 TDengine TSDB 3.3.0.0 tmq raw data      | -                |
| v3.5.3       | 重构 taosWS                                  | -                |
| v3.5.2       | WebSocket 压缩和优化消息订阅性能                      | 3.2.3.0 及更高版本    |
| v3.5.1       | 原生 stmt 查询和 geometry 类型支持                  | 3.2.1.0 及更高版本    |
| v3.5.0       | 获取消费进度及按照指定进度开始消费                          | 3.0.5.0 及更高版本    |
| v3.3.1       | 基于 WebSocket 的 schemaless 协议写入             | 3.0.4.1 及更高版本    |
| v3.1.0       | 提供贴近 kafka 的订阅 api                         | -                |
| v3.0.4       | 新增 request id 相关接口                         | 3.0.2.2 及更高版本    |
| v3.0.3       | 基于 WebSocket 的 statement 写入                | -                |
| v3.0.2       | 基于 WebSocket 的数据查询和写入                      | 3.0.1.5 及更高版本    |
| v3.0.1       | 基于 WebSocket 的消息订阅                         | -                |
| v3.0.0       | 适配 TDengine TSDB 3.0 查询和写入                 | 3.0.0.0 及更高版本    |

## 异常和错误码

如果是 TDengine TSDB 错误可以通过以下方式获取错误码和错误信息。

```go
// import "github.com/taosdata/driver-go/v3/errors"
    if err != nil {
        tError, is := err.(*errors.TaosError)
        if is {
            fmt.Println("errorCode:", int(tError.Code))
            fmt.Println("errorMessage:", tError.ErrStr)
        } else {
            fmt.Println(err.Error())
        }
    }
```

TDengine TSDB 其他功能模块的报错，请参考 [错误码](../../../reference/error-code)

## 数据类型映射

| TDengine TSDB DataType | Go Type   |
|-------------------|-----------|
| TIMESTAMP         | time.Time |
| TINYINT           | int8      |
| SMALLINT          | int16     |
| INT               | int32     |
| BIGINT            | int64     |
| TINYINT UNSIGNED  | uint8     |
| SMALLINT UNSIGNED | uint16    |
| INT UNSIGNED      | uint32    |
| BIGINT UNSIGNED   | uint64    |
| FLOAT             | float32   |
| DOUBLE            | float64   |
| BOOL              | bool      |
| BINARY            | string    |
| NCHAR             | string    |
| JSON              | []byte    |
| GEOMETRY          | []byte    |
| VARBINARY         | []byte    |
| DECIMAL           | string    |
| BLOB              | []byte    |

**注意**：JSON 类型仅在 tag 中支持。
GEOMETRY 类型是 little endian 字节序的二进制数据，符合 WKB 规范。详细信息请参考 [数据类型](../../taos-sql/datatype)
WKB 规范请参考[Well-Known Binary (WKB)](https://libgeos.org/specifications/wkb/)

## 示例程序汇总

示例程序源码请参考：[示例程序](https://github.com/taosdata/driver-go/tree/main/examples)

## 常见问题

1. `readBufferSize` 参数调大后无明显效果

   `readBufferSize` 调大后会减少获取结果时 `syscall` 的调用。如果查询结果的数据量不大，修改该参数不会带来明显提升，如果该参数修改过大，瓶颈会在解析 JSON 数据。如果需要优化查询速度，需要根据实际情况调整该值来达到查询效果最优。

2. `disableCompression` 参数设置为 `false` 时查询效率降低

   当 `disableCompression` 参数设置为 `false` 时查询结果会使用 `gzip` 压缩后传输，拿到数据后要先进行 `gzip` 解压。

3. `go get` 命令无法获取包，或者获取包超时

  设置 Go 代理 `go env -w GOPROXY=https://goproxy.cn,direct`。

4. 查询结果时区处理

  WebSocket 连接和原生连接查询结果使用 `time.Unix` 从时间戳转成本地时区。

## API 参考

### database/sql 驱动

`driver-go` 实现了 Go 的 `database/sql/driver` 接口，可以直接使用 Go 的 `database/sql` 包。提供了两个驱动：`github.com/taosdata/driver-go/v3/taosSql` 和 `github.com/taosdata/driver-go/v3/taosWS` 分别对应 `原生连接` 和 `WebSocket 连接`。

#### DSN 规范

数据源名称具有通用格式，例如 [PEAR DB](http://pear.php.net/manual/en/package.database.db.intro-dsn.php)，但没有类型前缀（方括号表示可选）：

``` text
[username[:password]@][protocol[(address)]]/[dbname][?param1=value1&...&paramN=valueN]
```

完整形式的 DSN：

```text
username:password@protocol(address)/dbname?param=value
```

当密码中包含特殊字符时，需要使用 `url.QueryEscape` 进行转义。

当使用 IPv6 地址时（v3.7.1 及以上版本支持），地址需要用方括号括起来，例如：

```text
root:taosdata@ws([::1]:6041)/testdb
```

##### 原生连接

导入驱动：

```go
import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosSql"
)
```

使用 `taosSql` 作为 `driverName` 并且使用一个正确的 DSN 作为 `dataSourceName` 如下：

```go
var taosUri = "root:taosdata@tcp(localhost:6030)/"
taos, err := sql.Open("taosSql", taosUri)
```

支持的 DSN 参数：

- `cfg` 指定 taos.cfg 目录。
- `cgoThread` 指定 cgo 同时执行的数量，默认为系统核数。
- `cgoAsyncHandlerPoolSize` 指定异步函数的 handle 大小，默认为 10000。
- `timezone` 指定连接使用的时区，sql 解析以及查询结果都会按照此时区进行转换，只支持 IANA 时区格式，特殊字符需要进行编码，以上海时区（`Asia/Shanghai`）为例：`timezone=Asia%2FShanghai`。

##### WebSocket 连接

导入驱动：

```go
import (
    "database/sql"
    _ "github.com/taosdata/driver-go/v3/taosWS"
)
```

使用 `taosWS` 作为 `driverName` 并且使用一个正确的 DSN 作为 `dataSourceName` 如下：

```go
var taosUri = "root:taosdata@ws(localhost:6041)/"
taos, err := sql.Open("taosWS", taosUri)
```

支持的 DSN 参数：

- `enableCompression` 是否发送压缩数据，默认为 false 不发送压缩数据，如果传输数据使用压缩设置为 true。
- `readTimeout` 读取数据的超时时间，默认为 5m。
- `writeTimeout` 写入数据的超时时间，默认为 10s。
- `timezone` 指定连接使用的时区，sql 解析以及查询结果都会按照此时区进行转换，只支持 IANA 时区格式，特殊字符需要进行编码，以上海时区（`Asia/Shanghai`）为例：`timezone=Asia%2FShanghai`。

### 连接功能

Go 驱动支持创建连接，返回支持 `sql/driver` 标准的 `Connector` 接口的对象，还提供了 `af` 包，扩充了一些无模式写入接口。

#### 标准接口

`database/sql` 包中创建连接的接口

- `func Open(driverName, dataSourceName string) (*DB, error)`
  - **接口说明**：(`database/sql`) 连接数据库
  - **参数说明**：
    - `driverName`：驱动名称。
    - `dataSourceName`：连接参数 DSN。
  - **返回值**：连接对象，错误信息。

#### 扩展接口

`af` 包中创建连接的接口

- `func Open(host, user, pass, db string, port int) (*Connector, error)`
  - **接口说明**：连接数据库。
  - **参数说明**：
    - `host`：主机地址。
    - `user`：用户名。
    - `pass`：密码。
    - `db`：数据库名称。
    - `port`：端口号。
  - **返回值**：连接对象，错误信息。

- `func (conn *Connector) SetTimezone(timezone string) error`
  - **接口说明**：设置连接时区。
  - **参数说明**：
    - `timezone`：时区字符串，使用 IANA 时区格式，例如：`Asia/Shanghai`。
  - **返回值**：错误信息。

#### 无模式写入

`af` 包中使用原生连接进行无模式写入的接口。

- `func (conn *Connector) InfluxDBInsertLines(lines []string, precision string) error`
  - **接口说明**：无模式写入 influxDB 格式数据。
  - **参数说明**：
    - `lines`：写入的数据。
    - `precision`：时间精度。
  - **返回值**：错误信息。

- `func (conn *Connector) OpenTSDBInsertJsonPayload(payload string) error`
  - **接口说明**：无模式写入 OpenTSDB JSON 格式数据。
  - **参数说明**：
    - `payload`：写入的数据。
  - **返回值**：错误信息。

- `func (conn *Connector) OpenTSDBInsertTelnetLines(lines []string) error`
  - **接口说明**：无模式写入 OpenTSDB Telnet 格式数据。
  - **参数说明**：
    - `lines`：写入的数据。
  - **返回值**：错误信息。

`ws/schemaless` 包中使用 WebSocket 无模式写入的接口

- `func (s *Schemaless) Insert(lines string, protocol int, precision string, ttl int, reqID int64) error`
  - **接口说明**：无模式写入数据。
  - **参数说明**：
    - `lines`：写入的数据。
    - `protocol`：写入的数据协议支持的协议 `InfluxDBLineProtocol = 1` `OpenTSDBTelnetLineProtocol = 2` `OpenTSDBJsonFormatProtocol = 3`。
    - `precision`：时间精度。
    - `ttl`：数据过期时间，0 表示不过期。
    - `reqID`：请求 ID。
  - **返回值**：错误信息。

### 执行 SQL

Go 驱动提供了符合 `database/sql` 标准的接口，支持以下功能：

1. **执行 SQL 语句**：执行静态 SQL 语句，并返回其生成的结果对象。
2. **查询执行**：可以执行返回数据集的查询（`SELECT` 语句）。
3. **更新执行**：可以执行影响行数的 SQL 语句，如 `INSERT`、`UPDATE`、`DELETE` 等。
4. **获取结果**：可以获取查询执行后返回的结果集，并遍历查询返回的数据。
5. **获取更新计数**：对于非查询 SQL 语句，可以获取执行后影响的行数。
6. **关闭资源**：释放数据库资源。

#### 标准接口

- `func (db *DB) Close() error`
  - **接口说明**：关闭连接。
  - **返回值**：错误信息。

- `func (db *DB) Exec(query string, args ...any) (Result, error)`
  - **接口说明**：执行查询但不返回任何行。
  - **参数说明**：
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：Result 对象（只有影响行数），错误信息。

- `func (db *DB) Query(query string, args ...any) (*Rows, error)`
  - **接口说明**：执行查询并返回行的结果。
  - **参数说明**：
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：Rows 对象，错误信息。

- `func (db *DB) QueryRow(query string, args ...any) *Row`
  - **接口说明**：执行查询并返回一行结果。
  - **参数说明**：
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：Row 对象。

#### 扩展接口

- `func (db *DB) ExecContext(ctx context.Context, query string, args ...any) (Result, error)`
  - **接口说明**：执行查询但不返回任何行。
  - **参数说明**：
    - `ctx`：上下文，使用 Value 传递请求 id 进行链路追踪，key 为 `taos_req_id` value 为 int64 类型值。
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：结果 Result 对象（只有影响行数），错误信息。

- `func (db *DB) QueryContext(ctx context.Context, query string, args ...any) (*Rows, error)`
  - **接口说明**：执行查询并返回行结果。
  - **参数说明**：
    - `ctx`：上下文，使用 Value 传递请求 id 进行链路追踪，key 为 `taos_req_id` value 为 int64 类型值。
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：结果集 Rows 对象，错误信息。

- `func (db *DB) QueryRowContext(ctx context.Context, query string, args ...any) *Row`
  - **接口说明**：执行查询并返回一行结果，错误信息会在扫描 Row 时延迟返回。
  - **参数说明**：
    - `ctx`：上下文，使用 Value 传递请求 id 进行链路追踪，key 为 `taos_req_id` value 为 int64 类型值。
    - `query`：要执行的命令。
    - `args`：命令参数。
  - **返回值**：单行结果 Row 对象。

### 结果获取

Go 驱动支持获取查询结果集，以及对应的结果集元数据，提供了用于读取结果集中元数据和数据的方法。

#### 结果集

通过 `Rows` 对象获取查询结果集，提供了以下方法：

- `func (rs *Rows) Next() bool`
  - **接口说明**：准备下一行数据。
  - **返回值**：是否有下一行数据。

- `func (rs *Rows) Columns() ([]string, error)`
  - **接口说明**：返回列名。
  - **返回值**：列名，错误信息。

- `func (rs *Rows) Scan(dest ...any) error`
  - **接口说明**：将当前行的列值复制到 dest 指向的值中。
  - **参数说明**：
    - `dest`：目标值。
  - **返回值**：错误信息。

- `func (rs *Rows) Close() error`
  - **接口说明**：关闭行。
  - **返回值**：错误信息。

- `func (r *Row) Scan(dest ...any) error`
  - **接口说明**：将当前行的列值复制到 dest 指向的值中。
  - **参数说明**：
    - `dest`：目标值。
  - **返回值**：错误信息。

通过 `Result` 对象获取更新结果集，提供了以下方法：

- `func (dr driverResult) RowsAffected() (int64, error)`
  - **接口说明**：返回受影响的行数。
  - **返回值**：受影响的行数，错误信息。

#### 结果集元数据

通过 `Rows` 对象获取查询结果集元数据，提供了以下方法：

- `func (rs *Rows) ColumnTypes() ([]*ColumnType, error)`
  - **接口说明**：返回列类型。
  - **返回值**：列类型，错误信息。

- `func (ci *ColumnType) Name() string`
  - **接口说明**：返回列名。
  - **返回值**：列名。

- `func (ci *ColumnType) Length() (length int64, ok bool)`
  - **接口说明**：返回列长度。
  - **返回值**：列长度，是否有长度。

- `func (ci *ColumnType) ScanType() reflect.Type`
  - **接口说明**：返回列类型对应的 Go 类型。
  - **返回值**：列类型。

- `func (ci *ColumnType) DatabaseTypeName() string`
  - **接口说明**：返回列类型数据库名称。
  - **返回值**：列类型名称。

### 参数绑定

Prepare 允许使用预编译的 SQL 语句，可以提高性能并提供参数化查询的能力，从而增加安全性。

#### 标准接口

使用 `sql/driver` 的 `Conn` 接口中的 `Prepare` 方法准备一个与此连接绑定的准备好的语句，返回 `Stmt` 对象，使用。

- `Prepare(query string) (Stmt, error)`
  - **接口说明**：准备返回一个与此连接绑定的准备好的语句 (statement)。
  - **参数说明**：
    - `query`：要进行参数绑定的语句。
  - **返回值**：Stmt 对象，错误信息。

- `func (s *Stmt) Exec(args ...any) (Result, error)`
  - **接口说明**：使用给定的参数执行准备好的语句并返回总结该语句效果的结果（只可以绑定列值，不支持绑定表名和 tag）。
  - **参数说明**：
    - `args`：命令参数，Go 原始类型会自动转换数据库类型，类型不匹配可能会丢精度，建议使用与数据库相同的类型，时间类型使用 int64 或 `RFC3339Nano` 格式化后的字符串。
  - **返回值**：结果 Result 对象（只有影响行数），错误信息。

- `func (s *Stmt) Query(args ...any) (*Rows, error)`
  - **接口说明**：使用给定的参数执行准备好的语句并返回行的结果。
  - **参数说明**：
    - `args`：命令参数，Go 原始类型会自动转换数据库类型，类型不匹配可能会丢精度，建议使用与数据库相同的类型，时间类型使用 int64 或 `RFC3339Nano` 格式化后的字符串。
  - **返回值**：结果集 Rows 对象，错误信息。

- `func (s *Stmt) Close() error`
  - **接口说明**：关闭语句。
  - **返回值**：错误信息。

#### 扩展接口

`af` 包中提供了使用原生连接进行参数绑定的更多接口

- `func (conn *Connector) Stmt() *Stmt`
  - **接口说明**：返回一个与此连接绑定的 Stmt 对象。
  - **返回值**：Stmt 对象。

- `func (s *Stmt) Prepare(sql string) error`
  - **接口说明**：准备一个 sql。
  - **参数说明**：
    - `sql`：要进行参数绑定的语句。
  - **返回值**：错误信息。

- `func (s *Stmt) NumParams() (int, error)`
  - **接口说明**：返回参数数量。
  - **返回值**：参数数量，错误信息。

- `func (s *Stmt) SetTableNameWithTags(tableName string, tags *param.Param)`
  - **接口说明**：设置表名和 tag。
  - **参数说明**：
    - `tableName`：表名。
    - `tags`：tag。
  - **返回值**：错误信息。

- `func (s *Stmt) SetTableName(tableName string) error`
  - **接口说明**：设置表名。
  - **参数说明**：
    - `tableName`：表名。
  - **返回值**：错误信息。

- `func (s *Stmt) BindRow(row *param.Param) error`
  - **接口说明**：绑定行。
  - **参数说明**：
    - `row`：行数据。
  - **返回值**：错误信息。

- `func (s *Stmt) GetAffectedRows() int`
  - **接口说明**：获取受影响的行数。
  - **返回值**：受影响的行数。

- `func (s *Stmt) AddBatch() error`
  - **接口说明**：添加批处理。
  - **返回值**：错误信息。

- `func (s *Stmt) Execute() error`
  - **接口说明**：执行批处理。
  - **返回值**：错误信息。

- `func (s *Stmt) UseResult() (driver.Rows, error)`
  - **接口说明**：使用结果。
  - **返回值**：结果集 Rows 对象，错误信息。

- `func (s *Stmt) Close() error`
  - **接口说明**：关闭语句。
  - **返回值**：错误信息。

从 3.6.0 版本开始，提供 stmt2 绑定参数的接口

- `func (conn *Connector) Stmt2(reqID int64, singleTableBindOnce bool) *Stmt2`
  - **接口说明**：从连接创建 stmt2。
  - **参数说明**：
    - `reqID`：请求 ID。
    - `singleTableBindOnce`：单个子表在单次执行中只有一次数据绑定。
  - **返回值**：stmt2 对象。
- `func (s *Stmt2) Prepare(sql string) error`
  - **接口说明**：绑定 sql 语句。
  - **参数说明**：
    - `sql`：要绑定的 sql 语句。
  - **返回值**：错误信息。
- `func (s *Stmt2) Bind(params []*stmt.TaosStmt2BindData) error`
  - **接口说明**：绑定数据。
  - **参数说明**：
    - params 要绑定的数据。
  - **返回值**：错误信息。
- `func (s *Stmt2) Execute() error`
  - **接口说明**：执行语句。
  - **返回值**：错误信息。
- `func (s *Stmt2) GetAffectedRows() int`
  - **接口说明**：获取受影响行数（只在插入语句有效）。
  - **返回值**：受影响行数。
- `func (s *Stmt2) UseResult() (driver.Rows, error)`
  - **接口说明**：获取结果集（只在查询语句有效）。
  - **返回值**：结果集 Rows 对象，错误信息。
- `func (s *Stmt2) Close() error`
  - **接口说明**：关闭 stmt2。
  - **返回值**：错误信息。

`ws/stmt` 包提供了通过 WebSocket 进行参数绑定的接口

- `func (c *Connector) Init() (*Stmt, error)`
  - **接口说明**：初始化。
  - **返回值**：Stmt 对象，错误信息。

- `func (s *Stmt) Prepare(sql string) error`
  - **接口说明**：准备一个 sql。
  - **参数说明**：
    - `sql`：要进行参数绑定的语句。
  - **返回值**：错误信息。

- `func (s *Stmt) SetTableName(name string) error`
  - **接口说明**：设置表名。
  - **参数说明**：
    - `name`：表名。
  - **返回值**：错误信息。

- `func (s *Stmt) SetTags(tags *param.Param, bindType *param.ColumnType)`
  - **接口说明**：设置 tag。
  - **参数说明**：
    - `tags`：tag。
    - `bindType`：类型信息。
  - **返回值**：错误信息。

- `func (s *Stmt) BindParam(params []*param.Param, bindType *param.ColumnType) error`
  - **接口说明**：绑定参数。
  - **参数说明**：
    - `params`：参数。
    - `bindType`：类型信息。
  - **返回值**：错误信息。

- `func (s *Stmt) AddBatch() error`
  - **接口说明**：添加批处理。
  - **返回值**：错误信息。

- `func (s *Stmt) Exec() error`
  - **接口说明**：执行批处理。
  - **返回值**：错误信息。

- `func (s *Stmt) GetAffectedRows() int`
  - **接口说明**：获取受影响的行数。
  - **返回值**：受影响的行数。

- `func (s *Stmt) UseResult() (*Rows, error)`
  - **接口说明**：使用结果。
  - **返回值**：Rows 对象，错误信息。

- `func (s *Stmt) Close() error`
  - **接口说明**：关闭语句。
  - **返回值**：错误信息。

Rows 行结果参考 `sql/driver` 包中的 `Rows` 接口，提供以下接口

- `func (rs *Rows) Columns() []string`
  - **接口说明**：返回列名。
  - **返回值**：列名。

- `func (rs *Rows) ColumnTypeDatabaseTypeName(i int) string`
  - **接口说明**：返回列类型数据库名称。
  - **参数说明**：
    - `i`：列索引。
  - **返回值**：列类型名称。

- `func (rs *Rows) ColumnTypeLength(i int) (length int64, ok bool)`
  - **接口说明**：返回列长度。
  - **参数说明**：
    - `i`：列索引。
  - **返回值**：列长度，是否有长度。

- `func (rs *Rows) ColumnTypeScanType(i int) reflect.Type`
  - **接口说明**：返回列类型对应的 Go 类型。
  - **参数说明**：
    - `i`：列索引。
  - **返回值**：列类型。

- `func (rs *Rows) Next(dest []driver.Value) error`
  - **接口说明**：准备下一行数据，并赋值给目标。
  - **参数说明**：
    - `dest`：目标值。
  - **返回值**：错误信息。

- `func (rs *Rows) Close() error`
  - **接口说明**：关闭行。
  - **返回值**：错误信息。

`common/param` 包中提供了参数绑定数据结构

以下是按照偏移设置参数的接口：

- `func NewParam(size int) *Param`
  - **接口说明**：创建一个参数绑定数据结构。
  - **参数说明**：
    - `size`：参数数量。
  - **返回值**：Param 对象。

- `func (p *Param) SetBool(offset int, value bool)`
  - **接口说明**：设置布尔值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：布尔值。

- `func (p *Param) SetNull(offset int)`
  - **接口说明**：设置空值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。

- `func (p *Param) SetTinyint(offset int, value int)`
  - **接口说明**：设置 Tinyint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Tinyint 值。

- `func (p *Param) SetSmallint(offset int, value int)`
  - **接口说明**：设置 Smallint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Smallint 值。

- `func (p *Param) SetInt(offset int, value int)`
  - **接口说明**：设置 Int 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Int 值。

- `func (p *Param) SetBigint(offset int, value int)`
  - **接口说明**：设置 Bigint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Bigint 值。

- `func (p *Param) SetUTinyint(offset int, value uint)`
  - **接口说明**：设置 UTinyint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：UTinyint 值。

- `func (p *Param) SetUSmallint(offset int, value uint)`
  - **接口说明**：设置 USmallint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：USmallint 值。

- `func (p *Param) SetUInt(offset int, value uint)`
  - **接口说明**：设置 UInt 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：UInt 值。

- `func (p *Param) SetUBigint(offset int, value uint)`
  - **接口说明**：设置 UBigint 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：UBigint 值。

- `func (p *Param) SetFloat(offset int, value float32)`
  - **接口说明**：设置 Float 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Float 值。

- `func (p *Param) SetDouble(offset int, value float64)`
  - **接口说明**：设置 Double 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Double 值。

- `func (p *Param) SetBinary(offset int, value []byte)`
  - **接口说明**：设置 Binary 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Binary 值。

- `func (p *Param) SetVarBinary(offset int, value []byte)`
  - **接口说明**：设置 VarBinary 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：VarBinary 值。

- `func (p *Param) SetNchar(offset int, value string)`
  - **接口说明**：设置 Nchar 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Nchar 值。

- `func (p *Param) SetTimestamp(offset int, value time.Time, precision int)`
  - **接口说明**：设置 Timestamp 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Timestamp 值。
    - `precision`：时间精度。

- `func (p *Param) SetJson(offset int, value []byte)`
  - **接口说明**：设置 Json 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Json 值。

- `func (p *Param) SetGeometry(offset int, value []byte)`
  - **接口说明**：设置 Geometry 值。
  - **参数说明**：
    - `offset`：偏移量 (列或标签)。
    - `value`：Geometry 值。

以下是链式调用设置参数的接口：

- `func (p *Param) AddBool(value bool) *Param`
  - **接口说明**：添加布尔值。
  - **参数说明**：
    - `value`：布尔值。
  - **返回值**：Param 对象。

其他类型与布尔值类似，具体接口如下：

- AddNull
- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry

以下是设置列类型信息的接口：

- `func NewColumnType(size int) *ColumnType`
  - **接口说明**：创建一个列类型信息数据结构。
  - **参数说明**：
    - `size`：列数量。
  - **返回值**：ColumnType 对象。

- `func (c *ColumnType) AddBool() *ColumnType`
  - **接口说明**：添加布尔类型。
  - **返回值**：ColumnType 对象。

其他类型与布尔类型类似，具体接口如下：

- AddTinyint
- AddSmallint
- AddInt
- AddBigint
- AddUTinyint
- AddUSmallint
- AddUInt
- AddUBigint
- AddFloat
- AddDouble
- AddBinary
- AddVarBinary
- AddNchar
- AddTimestamp
- AddJson
- AddGeometry

### 数据订阅

Go 驱动支持数据订阅功能，提供了基于原生连接和 WebSocket 连接的数据订阅接口。原生实现在 `af/tmq` 包中，WebSocket 实现在 `ws/tmq` 包中。

#### 消费者

- `func NewConsumer(conf *tmq.ConfigMap) (*Consumer, error)`
  - **接口说明**：创建一个消费者。
  - **参数说明**：
    - `conf`：配置信息。
  - **返回值**：Consumer 对象，错误信息。

配置信息定义为：

```go
type ConfigValue interface{}
type ConfigMap map[string]ConfigValue
```

创建消费者支持属性列表：

- `ws.url`：WebSocket 连接地址。
- `ws.message.channelLen`：WebSocket 消息通道缓存长度，默认 0。
- `ws.message.timeout`：WebSocket 消息超时时间，默认 5m。
- `ws.message.writeWait`：WebSocket 写入消息超时时间，默认 10s。
- `ws.message.enableCompression`：WebSocket 是否启用压缩，默认 false。
- `ws.autoReconnect`：WebSocket 是否自动重连，默认 false。
- `ws.reconnectIntervalMs`：WebSocket 重连间隔时间毫秒，默认 2000。
- `ws.reconnectRetryCount`：WebSocket 重连重试次数，默认 3。
- `timezone`：订阅结果时间类型解析使用的时区，使用 IANA 时区格式，例如：`Asia/Shanghai`（v3.7.4 及以上版本支持）。

其他参数请参考：[Consumer 参数列表](../../../develop/tmq/#创建参数)，注意 TDengine TSDB 服务端自 3.2.0.0 版本开始消息订阅中的 auto.offset.reset 默认值发生变化。

- `func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error`
  - **接口说明**：订阅主题。
  - **参数说明**：
    - `topic`：主题。
    - `rebalanceCb`：平衡回调（未使用）。
  - **返回值**：错误信息。

- `func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) error`
  - **接口说明**：订阅主题列表。
  - **参数说明**：
    - `topics`：主题列表。
    - `rebalanceCb`：平衡回调（未使用）。
  - **返回值**：错误信息。

- `func (c *Consumer) Unsubscribe() error`
  - **接口说明**：取消订阅。
  - **返回值**：错误信息。

- `func (c *Consumer) Poll(timeoutMs int) tmq.Event`
  - **接口说明**：轮询事件。
  - **参数说明**：
    - `timeoutMs`：超时时间。
  - **返回值**：事件。

- `func (c *Consumer) Commit() ([]tmq.TopicPartition, error)`
  - **接口说明**：提交偏移量。
  - **返回值**：TopicPartition 列表，错误信息。

- `func (c *Consumer) Assignment() (partitions []tmq.TopicPartition, err error)`
  - **接口说明**：获取分配信息。
  - **返回值**：TopicPartition 列表，错误信息。

- `func (c *Consumer) Seek(partition tmq.TopicPartition, ignoredTimeoutMs int) error`
  - **接口说明**：跳转到偏移量。
  - **参数说明**：
    - `partition`：分区和偏移信息。
    - `ignoredTimeoutMs`：超时时间（未使用）。
  - **返回值**：错误信息。

- `func (c *Consumer) Committed(partitions []tmq.TopicPartition, timeoutMs int) (offsets []tmq.TopicPartition, err error)`
  - **接口说明**：获取提交的偏移量。
  - **参数说明**：
    - `partitions`：分区列表。
    - `timeoutMs`：超时时间。
  - **返回值**：TopicPartition 列表，错误信息。

- `func (c *Consumer) CommitOffsets(offsets []tmq.TopicPartition) ([]tmq.TopicPartition, error)`
  - **接口说明**：提交偏移量。
  - **参数说明**：
    - `offsets`：偏移量列表。
  - **返回值**：TopicPartition 列表，错误信息。

- `func (c *Consumer) Position(partitions []tmq.TopicPartition) (offsets []tmq.TopicPartition, err error)`
  - **接口说明**：获取当前偏移量。
  - **参数说明**：
    - `partitions`：分区列表。
  - **返回值**：TopicPartition 列表，错误信息。

- `func (c *Consumer) Close() error`
  - **接口说明**：关闭消费者。
  - **返回值**：错误信息。

#### 消费记录

当 `Poll` 返回 `tmq.Event` 事件时，可以通过判断 `tmq.Event` 的类型获取消费记录或错误信息。当类型为 `*tmq.DataMessage` 时，可以获取消费记录。

- `func (m *DataMessage) Topic() string`
  - **接口说明**：获取主题。
  - **返回值**：主题。

- `func (m *DataMessage) DBName() string`
  - **接口说明**：获取数据库名称。
  - **返回值**：数据库名称。

- `func (m *DataMessage) Offset() Offset`
  - **接口说明**：获取偏移量。
  - **返回值**：偏移量。

- `func (m *DataMessage) Value() interface{}`
  - **接口说明**：获取值，具体值为 `[]*tmq.data`。
  - **返回值**：消费到的值。

tmq.data 结构如下：

```go
type Data struct {
 TableName string
 Data      [][]driver.Value
}
```

- TableName 为表名
- Data 为数据，每个元素为一行数据，每行数据为一个数组，数组元素为列值。

当 Poll 返回类型为 `tmq.Error` 时，可以使用 `func (e Error) Error() string` 获取错误信息。

#### 分区信息

当消费到数据类型为 `*tmq.DataMessage` 时，可以从 `TopicPartition` 属性中获取分区信息。

```go
type TopicPartition struct {
 Topic     *string
 Partition int32
 Offset    Offset
 Metadata  *string
 Error     error
}
```

- `Topic`：主题。
- `Partition`：分区。
- `Offset`：偏移量。
- `Metadata`：元数据（未使用）。
- `Error`：错误信息。

可以使用 `func (p TopicPartition) String() string` 获取分区信息。

#### 偏移量元数据

从 `TopicPartition` 中获取的偏移量信息，可以通过 `Offset` 属性获取偏移量元数据。当偏移量为 `-2147467247` 时表示未设置偏移量。

#### 反序列化

当消费到数据类型为 `*tmq.DataMessage` 时，可以使用 `func (m *DataMessage) Value() interface{}` 获取数据，数据类型为 `[]*tmq.data` 。

## 附录

- [driver-go 文档](https://pkg.go.dev/github.com/taosdata/driver-go/v3)。
- [视频教程](https://www.taosdata.com/blog/2020/11/11/1951.html)。
